package com.example.demo.common;
import com.example.demo.entity.Column;
import com.example.demo.entity.JobTable;

import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.Date;

import lombok.extern.slf4j.Slf4j;

/**
 * @author guiguketang
 * @date 2020/11/13 15:27
 *
 */
@Slf4j
public class SynDataMain {
    public int syncDataByJobTable(JobTable jt) {
        Connection conn = null;
        Statement stmt = null;
        Connection connDes = null;
        Statement stmtDes = null;
        String start_time = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());

        try {
            //the information of source database;
            Class.forName("com.microsoft.sqlserver.jdbc.SQLServerDriver").newInstance();
            String url = "jdbc:sqlserver://" + jt.getDbip_read() + ":1433; DatabaseName=" + jt.getDbname_read()+"_bigdata_snap";
            String user = "erp_dp";
            String password = "3DD609B7-FED0-49FA-9F21-5E08A4F7609E";
            conn = DriverManager.getConnection(url, user, password);
            //conn.setAutoCommit(false);
            stmt = conn.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_UPDATABLE);
            log.info("connect to read db url is==== "+url);
            //the information of destination database;
            String urlDes = "jdbc:sqlserver://" + jt.getDbip_write() + ":1433; DatabaseName=" + jt.getDbname_write();
            String userDes = "erp_dp";
            String passwordDes = "3DD609B7-FED0-49FA-9F21-5E08A4F7609E";
            connDes = DriverManager.getConnection(urlDes, userDes, passwordDes);
            connDes.setAutoCommit(false);
            stmtDes = connDes.createStatement(ResultSet.TYPE_SCROLL_SENSITIVE, ResultSet.CONCUR_READ_ONLY);
            String des_sql = "insert into " + jt.getJob_table() + "(" + jt.getColumns() + ") values(";
            if (jt.getIs_identity() == 1) {
                System.out.println(" setting identity");
                stmtDes.addBatch("set IDENTITY_INSERT " + jt.getJob_table() + "  on");
            }
//            System.out.println("==The presql is:===" + jt.getPresql());
            String strPreSql[] = jt.getPresql().split(",");
            for (String sql : strPreSql) {
//                System.out.println("the preSql is>>>>"+sql);
                if (sql.length() != 0) {
                    stmtDes.addBatch(sql.replaceAll("'", ""));
                }
            }

            //获取同步表列数据类型；
            String columnSql = "SELECT a.colorder, a.name,b.name type " +
                    "FROM syscolumns a " +
                    "left join systypes b on a.xusertype = b.xusertype " +
                    "inner join sysobjects d on a.id = d.id  and d.xtype = 'U' and d.name <> 'dtproperties' " +
                    "left join syscomments e on a.cdefault = e.id " +
                    "left join sys.extended_properties g on a.id = G.major_id and a.colid = g.minor_id " +
                    "left join sys.extended_properties f on d.id = f.major_id and f.minor_id = 0 " +
                    "where d.name = '" + jt.getJob_table() + "' " +
                    "order by a.id,a.colorder";

            //取表第一列用于分页
            List<Column> columnList = new ArrayList<Column>();
            ResultSet rsColumn = stmtDes.executeQuery(columnSql);
            String row_id = "id";
            while (rsColumn.next()) {
                if(rsColumn.isFirst()){
                    row_id=rsColumn.getString("name");
                }
                Column columnObj = new Column();
                columnObj.setOrder(rsColumn.getInt("colorder"));
                columnObj.setName(rsColumn.getString("name"));
                columnObj.setType(rsColumn.getString("type"));
                //System.out.println("the object is:" + columnObj.toString());
                columnList.add(columnObj);
            }
            String base_sql = "select * from " + jt.getJob_table();
            String count_sql = "select count(*) from " + jt.getJob_table();
            //the total number of synchronizing table.
            int count = 0;
            ResultSet rsc = stmt.executeQuery(count_sql);
            while (rsc.next()) {
                count = rsc.getInt(1);
//                System.out.println("the total count is=========>" + count);
            }
            //暂时不支持大于500万数据的同步任务;
            if(count>5000000){
                log.info(jt.getJob_table()+"同步任务暂停，不支持五百万级以上表数据同步！");
                return 2;
            }


            //如果待同步表数据超过10万，则分批同步数据，batch start;
            int batch = 100000;
            for (int i = 0; i < count / batch + 1; i++) {
//                System.out.println("finally, the row_id is==>"+row_id);
                base_sql = "select top "+batch+" * from " +
                " (select row_number() over(order by "+row_id+") as rownumber,* from "+jt.getJob_table()+") temp_row"
                        +" where rownumber>"+batch*i;
                ResultSet rs = stmt.executeQuery(base_sql);
                //入库结果集处理;
                while (rs.next()) {
                    String insert_sql = "";
                    //拼接insert，如果是最后一列不加“,”，否则需要加逗号。
                    for (Column colObj : columnList) {
                        switch (colObj.getType()) {
                            case "ID":
                            case "bit":
                            case "int":
                            case "STATUS":
                            case "bigint":
                            case "smallint":
                            case "SHORTINT":
                            case "SMALL_INT":
                                insert_sql += columnList.get(columnList.size() - 1).equals(colObj) ? rs.getInt(colObj.getName()) : rs.getInt(colObj.getName()) + ",";
                                break;
                            case "numeric":
                            case "decimal":
                            case "DECIMAL1":
                                insert_sql += columnList.get(columnList.size() - 1).equals(colObj) ? rs.getBigDecimal(colObj.getName()) : rs.getBigDecimal(colObj.getName()) + ",";
                                break;
                            case "DATE":
                            case "datetime":
                            case "DATE_TIME":
                            case "DATETIME1":
                                String data_value="";
                                if(rs.getTimestamp(colObj.getName())==null){
                                    //do nothing!
                                }else{
                                    data_value=rs.getTimestamp(colObj.getName()).toString();
                                }
                                insert_sql += columnList.get(columnList.size() - 1).equals(colObj) ? "'" + data_value + "'" : "'" + data_value + "',";
                                break;
                            default:
                                String str_value="";
                                if(rs.getString(colObj.getName())==null){
                                    //do nothing!
                                }else{
                                    str_value=rs.getString(colObj.getName()).toString().replace("'","''");
                                }
                                insert_sql += columnList.get(columnList.size() - 1).equals(colObj) ? "'" + str_value + "'" : "'" + str_value + "',";
                        }
                    }
                    insert_sql = des_sql + insert_sql + ")";
                    stmtDes.addBatch(insert_sql);
                }
                //每20万条记录提交一次，防止GC overhead limit exceeded
                if(i>0 && (i+1)%2==0){
                    System.out.println("执行commit,the i is>>>>>"+i);
                    stmtDes.executeBatch();
                    connDes.commit();
                    stmtDes.clearBatch();
                    log.info("executing batch commit! 第"+(i+1)*10+"万条记录");
                }
            }
            //batch end;
            if (jt.getIs_identity() == 1) {
                System.out.println(" resetting identity");
                stmtDes.addBatch("set IDENTITY_INSERT " + jt.getJob_table() + " off");
            }
            if (jt.getPostsql() != null && jt.getPostsql().length() > 0) {
                String strPostSql[] = jt.getPostsql().split(",");
                for (String sql : strPostSql) {
                    System.out.println("the post sql is===>"+sql);
                    if (sql.trim().length() != 0) {
                        stmtDes.addBatch(sql.replaceAll("'", ""));
                    }
                }
            }
            String filter_columns[] = {};
            if(jt.getColumn_name()!=null && jt.getColumn_name().length()>0)
                filter_columns = jt.getColumn_name().split(",");
            String symbol="";
            if(filter_columns.length>0){
                String filter_sql="update "+jt.getJob_table()+" set ";
                for(String data:filter_columns){
                    filter_sql+=symbol+data+"='****'";
                    symbol=",";
                }
                System.out.println("filter_sql is===="+filter_sql);
                stmtDes.addBatch(filter_sql);
            }
            stmtDes.executeBatch();
            connDes.commit();
            System.out.println("结束时间：" + System.nanoTime());
            stmtDes.close();
            connDes.close();
            stmt.close();
            conn.close();
            return 1;
        } catch (Exception e) {
            e.printStackTrace();
            return 0;
        } finally {
            try {
                if (stmt != null) {
                    stmt.close();
                }
                if (conn != null) {
                    conn.close();
                }
            } catch (Exception e) {
                e.printStackTrace();
                return 0;
            }
            System.out.println("start_time is====>" + start_time);
            System.out.println("end_time is===>" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
        }
    }
}